{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%session_id_prefix native-delta-sql-s3path\n",
    "%glue_version 3.0\n",
    "%idle_timeout 60\n",
    "%%configure \n",
    "{\n",
    "  \"--conf\": \"spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n",
    "  \"--datalake-formats\": \"delta\"\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "bucket_name = \"<Your S3 bucket name>\"\n",
    "bucket_prefix = \"<Your S3 bucket prefix>\"\n",
    "database_name = \"delta_sql_s3path\"\n",
    "database_prefix = f\"{bucket_prefix}/{database_name}\"\n",
    "database_location = f\"s3://{bucket_name}/{database_prefix}/\"\n",
    "table_name = \"products\"\n",
    "table_prefix = f\"{database_prefix}/{table_name}\"\n",
    "table_location = f\"s3://{bucket_name}/{table_prefix}/\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Clean up existing resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import boto3\n",
    "\n",
    "## Delete files in S3\n",
    "s3 = boto3.resource('s3')\n",
    "bucket = s3.Bucket(bucket_name)\n",
    "bucket.objects.filter(Prefix=f\"{table_prefix}/\").delete()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Delta table with sample data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "import time\n",
    "\n",
    "ut = time.time()\n",
    "\n",
    "product = [\n",
    "    {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n",
    "]\n",
    "\n",
    "df_products = spark.createDataFrame(Row(**x) for x in product)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_products.write.format(\"delta\"). \\\n",
    "  mode(\"overwrite\"). \\\n",
    "  save(table_location)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Read from Delta Lake table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Insert records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ut = time.time()\n",
    "query=f\"\"\"INSERT INTO delta.`s3://{bucket_name}/{table_prefix}` VALUES('00006', 'Pen', 30,'Stationery',{ut}), ('00007', 'Book', 500,'Stationery',{ut})\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Update records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ut = time.time()\n",
    "query=f\"\"\"UPDATE  delta.`s3://{bucket_name}/{table_prefix}` SET price=300, updated_at={ut} WHERE product_id == '00007'\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Upsert records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ut = time.time()\n",
    "product_updates = [\n",
    "    {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update\n",
    "    {'product_id': '00008', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert\n",
    "]\n",
    "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)\n",
    "\n",
    "df_product_updates.createOrReplaceTempView(\"tmp_products_updates\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"MERGE INTO delta.`s3://{bucket_name}/{table_prefix}` AS old \n",
    "USING tmp_products_updates AS new \n",
    "ON old.product_id=new.product_id \n",
    "WHEN MATCHED THEN \n",
    "UPDATE SET \n",
    "    old.product_name=new.product_name,\n",
    "    old.price=new.price,\n",
    "    old.category=new.category,\n",
    "    old.updated_at=new.updated_at\n",
    "WHEN NOT MATCHED \n",
    "THEN INSERT (product_id, product_name, price,category,updated_at) \n",
    "VALUES ( \n",
    "    new.product_id, \n",
    "    new.product_name, \n",
    "    new.price, \n",
    "    new.category, \n",
    "    new.updated_at \n",
    ")\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Alter DeltaLake table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"ALTER TABLE delta.`s3://{bucket_name}/{table_prefix}` ADD COLUMNS (CURRENCY STRING AFTER PRICE)\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query=f\"\"\"UPDATE  delta.`s3://{bucket_name}/{table_prefix}` SET CURRENCY =\"INR\" \"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Delete records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"DELETE FROM delta.`s3://{bucket_name}/{table_prefix}` WHERE product_name == \"Pen\" \"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## View History"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"DESCRIBE HISTORY delta.`s3://{bucket_name}/{table_prefix}` \"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Query with time travel"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from delta import *\n",
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}@v5`\"\"\" #Using a version number\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You need timestamp value in yyyyMMddHHmmssSSS format, and replace the folloiwng timestamp with the value."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}@20220414145923000`\"\"\" #by passing the timestamp in yyyyMMddHHmmssSSS format to the path\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note: Spark SQL does not support TIMESTAMP AS OF/VERSION keywords as of now https://github.com/delta-io/delta/issues/128 #https://issues.apache.org/jira/browse/SPARK-34978"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Roll Back"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "##Fix accidental deletes to product_name pen\n",
    "query = f\"\"\"INSERT INTO delta.`s3://{bucket_name}/{table_prefix}`\n",
    "  SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}@v5`\n",
    "  WHERE product_id = 00006\"\"\"\n",
    "\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "##Fix accidental updates to product_id 00005. First we update the price of product_id 00005\n",
    "\n",
    "ut=time.time()\n",
    "query=f\"\"\"UPDATE delta.`s3://{bucket_name}/{table_prefix}` SET price=100,updated_at={ut} WHERE product_id == '00005'\"\"\"\n",
    "\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"DESCRIBE HISTORY delta.`s3://{bucket_name}/{table_prefix}` \"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Roll Back the update just made\n",
    "query=f\"\"\"MERGE INTO delta.`s3://{bucket_name}/{table_prefix}` dest\n",
    "USING delta.`s3://{bucket_name}/{table_prefix}@v7` src\n",
    "ON src.product_id = dest.product_id\n",
    "WHEN MATCHED THEN UPDATE SET * \"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"SELECT * FROM delta.`s3://{bucket_name}/{table_prefix}`\"\"\"\n",
    "spark.sql(query).show(truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Stop Session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%stop_session"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Glue PySpark",
   "language": "python",
   "name": "glue_pyspark"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
